package com.zxt.mq.consumer;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.zstk.frame.context.ZAppContextUtil;
import com.zxt.mq.config.rabbitmq.RabbitData;
import com.zxt.mq.mvc.log.receive.MessageReceiveLogService;
import com.zxt.mq.mvc.log.receive.zconstant.ReceiveLogConstant;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * <p>
 * 消费者抽象服务类
 * </p>
 *
 * @author zxt
 * @since 2022/7/22 11:47
 */
public abstract class BaseConsumer implements IDynamicConsumer {
    private volatile boolean                        end = false;
    private          SimpleMessageListenerContainer container;
    private          boolean                        autoAck;

    private MessageReceiveLogService messageReceiveLogService = ZAppContextUtil.getBean(MessageReceiveLogService.class);

    @Override
    public void setContainer(SimpleMessageListenerContainer container) {
        this.container = container;
        autoAck = container.getAcknowledgeMode().isAutoAck();
    }

    @Override
    public void shutdown() {
        end = true;
    }

    protected void autoAck(Message message, Channel channel, boolean success) throws IOException {
        if (autoAck) {
            return;
        }

        //channel.basicCancel(message.getMessageProperties().getConsumerTag());
        String str = new String(message.getBody(), StandardCharsets.UTF_8);
        RabbitData rabbitData = JSONObject.parseObject(str, RabbitData.class);
        if (success) {
            messageReceiveLogService.saveLog(rabbitData, ReceiveLogConstant.CONFIRM);
            //参数二 如为true 则会自动确认小于等于当前DeliveryTag 的所有消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } else {
            // 参数三 true 消息会重新入队，以便发送给下一个消费者，false 则会从队列中删除
            // todo 失败消息重新入队且到队列一个位置，有可能造成死循环，队列阻塞
            // 解决方式为 消费失败后存入数据库，进行消息的重试
            messageReceiveLogService.saveLog(rabbitData, ReceiveLogConstant.UN_CONFIRM);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            autoAck(message, channel, process(message, channel));
        } catch (Exception e) {
            autoAck(message, channel, false);
            throw e;
        } finally {
            if (end) {
                container.stop();
            }
        }
    }

    /**
     * 消费者收到消息的处理
     *
     * @param message 消息对象
     * @param channel 消息渠道
     *
     * @return 处理返回结果
     */
    public abstract boolean process(Message message, Channel channel);
}
